Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Delta Lake support #89

Merged
merged 1 commit into from
Dec 28, 2022
Merged

feat: add Delta Lake support #89

merged 1 commit into from
Dec 28, 2022

Conversation

mikix
Copy link
Contributor

@mikix mikix commented Dec 5, 2022

Description

Pass --output-format=deltalake to enable. And make sure your AWS glue is set up to support it.

This is not enabled by default (yet) and this commit does not yet do anything clever with incremental bulk exports. But this is a beginning to build upon.

Some ugly parts of this PR:

  • Java. We are now (behind the scenes via pysparks) downloading jars and running Java code when using delta lake support. There's a native Rust version with python bindings, but it's not feature complete yet.
  • Delta Lake is VERY SLOW. Even with toy data. Which is annoying in prod and also annoying when running pytest.
  • Stdout suppression. Pysparks is crazy noisy on the console. So I did quite a little hack to keep it quiet (see deltalake.py for my sins).

Relates to #75 and provides a solution for #76

Checklist

  • Consider if documentation (like in docs/) needs to be updated
  • Consider if tests should be added

@mikix mikix mentioned this pull request Dec 5, 2022
2 tasks
@mikix mikix changed the title feat: add ACI data lake support feat: add ACID data lake support Dec 12, 2022
@mikix mikix force-pushed the mikix/acidlake branch 2 times, most recently from 56b1f62 to e015953 Compare December 21, 2022 21:09
@mikix mikix changed the title feat: add ACID data lake support feat: add Delta Lake support Dec 21, 2022
try:
self.root.rm(parent_dir, recursive=True)
except FileNotFoundError:
pass

try:
full_path = self.root.joinpath(f'{path}.{batch:03}.{self.suffix}')
full_path = self.root.joinpath(f'{dbname}/{dbname}.{batch:03}.{self.suffix}')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change right here is renaming our output files from condition/fhir_conditions.000.ndjson to condition/condition.000.ndjson. Not an important part of this PR, but it made it easier to handle all output tables the same. But it also caused a lot of file/test changes, sorry.

os.dup2(stderr, 2)


class DeltaLakeFormat(AthenaFormat):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not proud of much of this class. From suppressing output, to the list of jars and versions, to converting fsspec s3 params to hadoop s3 params. But it does seem to technically work...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you feel about seperating the write and merge? like - just have the etl process create the data and push to S3, and have some kind of service in AWS pick up those files and do the deltalake insertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry can you explain the intent there? Separating out the gross Java-requiring code to another step of the process, or is there a fancy AWS thing you are thinking of that saves us overall code or something else?

I'm leery of the control we'd be giving up there. Like in the short term, I'm going to add support for the FHIR server telling us which resources have been deleted since we last exported. How might that look with what you're proposing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grosser - you'd have to set up some way to call an API or otherwise execute a query.

but doing the data lake injest in aws, you might be able to do a glue job rather than worry about hand managing it: https://aws.amazon.com/marketplace/pp/prodview-seypofzqhdueq - it might be easier overall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I think I prefer hand managing: more control over flow (like deleting), less AWS-dependency, and less moving parts / infrastructure. And the cost feels light -- 20 lines of "upsert or create" basically.

Are your trade-off sliders set differently, or were you musing aloud?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly musing, especially for the frequency we're running jobs at.

python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.7", "3.8", "3.9", "3.10"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK how do we feel about this? Pyspark 3.4 officially supports py3.11. But delta-spark is still pinned to 3.3.

So... how much do we feel the need to support py3.11? Our docker image right now is still 3.10, so this doesn't affect our shipped artifacts yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i generally have no interest in being leading edge - but we should watch it.

Pass --output-format=deltalake to enable. And make sure your AWS glue
is set up to support it.

This is not enabled by default (yet) and this commit does not yet
do anything clever with incremental bulk exports. But this is a
beginning to build upon.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants